Skip to content

rabbit_stream_coordinator: Make new_stream command idempotent#15706

Merged
the-mikedavis merged 1 commit intomainfrom
md/idempotent-new-stream
Mar 18, 2026
Merged

rabbit_stream_coordinator: Make new_stream command idempotent#15706
the-mikedavis merged 1 commit intomainfrom
md/idempotent-new-stream

Conversation

@the-mikedavis
Copy link
Copy Markdown
Collaborator

This is similar to #14884 but for new_stream.

Although its unlikely from the calling code, the stream coordinator can end up with two {new_stream, StreamId, #{}} commands in its log for the same StreamId. When handling the second new_stream it will then warning-log that the stream can't be updated, like so:

2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> rabbit_stream_coordinator failed to update stream:
2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> function_clause
2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> [{rabbit_stream_coordinator,update_stream0,[#{index => 51,system_time => 1773163898892,reply_mode => await_consensus,machine_version => 6,...},{new_stream,[95|...],#{}},{stream,[...],...}],[{file,[114|...]},{line,1397}]},{rabbit_stream_coordinator,update_stream,3,[{file,[...]},{line,...}]},{rabbit_stream_coordinator,apply,3,[{file,...},{...}]},{ra_server,apply_with,2,[{...}|...]},{ra_log,fold,5,[...]},{ra_server,apply_to,5,...},{ra_server,evaluate_commit_index_follower,...},{ra_server,...}]

update_stream0/3 only has a function clause for when there is no stream (i.e. undefined,

update_stream0(#{system_time := _} = Meta,
{new_stream, StreamId, #{leader_node := LeaderNode,
queue := Q}}, undefined) ->
#{nodes := Nodes} = Conf = amqqueue:get_type_state(Q),
%% this jumps straight to the state where all members
%% have been stopped and a new writer has been chosen
E = 1,
QueueRef = amqqueue:get_name(Q),
Members = maps:from_list(
[{N, #member{role = case LeaderNode of
N -> {writer, E};
_ -> {replica, E}
end,
state = {ready, E},
%% no members are running actions
current = undefined}
} || N <- Nodes]),
#stream{id = StreamId,
epoch = E,
nodes = Nodes,
queue_ref = QueueRef,
conf = Conf,
members = Members,
reply_to = maps:get(from, Meta, undefined)};
). I think it's reasonable to have the new_stream command become idempotent and effectively skip it if it is in the log twice.

I haven't been able to reproduce the scenario where this happened. Next time I will dump Khepri and the coordinator's logs to see what sequence of commands lead to this.

@the-mikedavis
Copy link
Copy Markdown
Collaborator Author

I don't think this needs a new machine version since it only affects the reply effects. We could base it on #15695 to take advantage of version 7 though.

@acogoluegnes
Copy link
Copy Markdown
Contributor

LGTM. We can indeed wait for #15695 to be merged and use the v7+ macro in a guard on the new filter_command clause.

@the-mikedavis the-mikedavis marked this pull request as draft March 12, 2026 14:17
@the-mikedavis the-mikedavis force-pushed the md/idempotent-new-stream branch from 7aa9951 to 8f0aac8 Compare March 18, 2026 13:48
@the-mikedavis the-mikedavis marked this pull request as ready for review March 18, 2026 13:51
@acogoluegnes
Copy link
Copy Markdown
Contributor

@the-mikedavis #15695 has been merged, can you rebase and apply this change only after v7?

@the-mikedavis
Copy link
Copy Markdown
Collaborator Author

Yep this is now using the ?V7_OR_MORE macro in the filter_command/3 guard

@the-mikedavis the-mikedavis added this to the 4.4.0 milestone Mar 18, 2026
@the-mikedavis the-mikedavis merged commit 2c142f4 into main Mar 18, 2026
353 of 354 checks passed
@the-mikedavis the-mikedavis deleted the md/idempotent-new-stream branch March 18, 2026 18:21
the-mikedavis added a commit that referenced this pull request Mar 18, 2026
rabbit_stream_coordinator: Make new_stream command idempotent (backport #15706)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants